Skip to content

gen-ai instrumentation(feat): anthropic messages stream method instrumentation#4499

Open
eternalcuriouslearner wants to merge 20 commits intoopen-telemetry:mainfrom
eternalcuriouslearner:feat/anthropic-messages-stream-method-instrumentation
Open

gen-ai instrumentation(feat): anthropic messages stream method instrumentation#4499
eternalcuriouslearner wants to merge 20 commits intoopen-telemetry:mainfrom
eternalcuriouslearner:feat/anthropic-messages-stream-method-instrumentation

Conversation

@eternalcuriouslearner
Copy link
Copy Markdown
Contributor

Description

This PR adds instrumentation to the Anthropic Messages SDK's stream method.

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

  • Added vcr based tests to verify span creation for various scenarios.

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds tracing instrumentation for the Anthropic Python SDK Messages.stream() helper in the opentelemetry-instrumentation-anthropic GenAI instrumentation, including VCR-backed tests and wrapper updates to support the new lifecycle.

Changes:

  • Add Messages.stream() patching and a new MessagesStreamManagerWrapper path to produce spans for the stream helper.
  • Refactor stream wrappers to use invocation stop()/fail() directly (instead of handler stop/fail calls).
  • Add VCR cassettes + new sync tests for Messages.stream() success/content/error/user-exception scenarios, and adjust wrapper unit tests.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py Import refactor for get_content_attributes usage.
util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py Import refactor for get_content_attributes usage.
instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py Adds messages_stream() patch + shared invocation creation helper.
instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py Updates wrappers to stop/fail via invocation; adds sync manager __enter__ failure handling.
instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/init.py Instruments/uninstruments Messages.stream in addition to Messages.create.
instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py Adds new sync Messages.stream() tests (VCR + error paths).
instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py Updates wrapper unit tests to match invocation stop/fail API; adds sync-manager enter failure test.
instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/*.yaml New VCR recordings for Messages.stream() test cases.
instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md Notes addition of Messages.stream() instrumentation.

Comment on lines +153 to +160
invocation, capture_content = _create_invocation(
handler, instance, args, kwargs
)

try:
return MessagesStreamManagerWrapper(
wrapped(*args, **kwargs), invocation, capture_content
)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages_stream() starts an InferenceInvocation (which immediately starts a span and attaches it to the current context) before the underlying MessageStreamManager is entered. If a caller stores the manager and enters it later—or never enters it—this leaves the span/context attached for longer than the actual request (or indefinitely), which can corrupt parent/child relationships and leak context.

Consider deferring handler.start_inference(...) until the manager wrapper’s __enter__ (e.g., pass the handler + extracted params or a factory into MessagesStreamManagerWrapper and create/start the invocation inside __enter__, failing/stopping it there as needed).

Copilot uses AI. Check for mistakes.
Comment on lines 387 to 395
async def __aenter__(
self,
) -> AsyncMessagesStreamWrapper[ResponseFormatT]:
msg_stream = await self._manager.__aenter__()
self._stream_wrapper = AsyncMessagesStreamWrapper(
msg_stream,
self._handler,
self._invocation,
self._capture_content,
)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncMessagesStreamManagerWrapper.__aenter__() doesn’t handle exceptions from the wrapped manager’s __aenter__. If __aenter__ raises, the passed-in invocation span remains open/attached because invocation.fail(exc) is never called (unlike the sync manager wrapper).

Wrap the awaited self._manager.__aenter__() in a try/except, call self._invocation.fail(exc), then re-raise the original exception.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

@lmolkova lmolkova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good, just some implementation-level concerns

)
with self._safe_instrumentation("stop_llm"):
self.handler.stop_llm(self.invocation)
with self._safe_instrumentation("invocation stop"):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_safe_instrumentation("invocation stop") prevents telemetry cleanup failures from surfacing to users.

_stop() can run during close(), StopIteration, context-manager exit, or response proxy close. If invocation.stop() raises due to instrumentation/exporter/completion-hook behavior, the wrapper should not replace or introduce an application exception. The guard keeps the instrumentation non-invasive: it logs at debug and preserves normal stream behavior. Lmk if you think this is an overkill I can remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never do this in other places. instrumentation / exporter / completion hook should never raise. if something fails, 1. it must not be otel code 2. we mist bubble it up

@lzchen lzchen added the gen-ai Related to generative AI label Apr 30, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.

@lzchen
Copy link
Copy Markdown
Contributor

lzchen commented May 4, 2026

@eternalcuriouslearner

Will async be implemented as well or leave that with a separate pr?

@eternalcuriouslearner
Copy link
Copy Markdown
Contributor Author

Will async be implemented as well or leave that with a separate pr?

Yes @lzchen it will be in separate pr. We already have one for that.

@eternalcuriouslearner eternalcuriouslearner changed the title feat: anthropic messages stream method instrumentation gen-ai(feat): anthropic messages stream method instrumentation May 5, 2026
@eternalcuriouslearner eternalcuriouslearner changed the title gen-ai(feat): anthropic messages stream method instrumentation gen-ai instrumentation(feat): anthropic messages stream method instrumentation May 5, 2026
Copy link
Copy Markdown
Member

@lmolkova lmolkova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one concern about _safe_instrumentation("invocation stop") - not sure I understand the motivation behind it still.

OTel error handling approach is that you should never expect exception from otel component, so if something throws it's the code not related to otel, and we should be transparent and let it bubble up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

gen-ai Related to generative AI

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

8 participants